feat(csv): split GeoJSON and PMTiles into low-priority RQ jobs#408
feat(csv): split GeoJSON and PMTiles into low-priority RQ jobs#408bolinocroustibat wants to merge 7 commits intogeojson-from-dbfrom
Conversation
a76bc0e to
af597d2
Compare
98ff286 to
90df48e
Compare
| async def download_url_to_tempfile(url: str, suffix: str = "") -> Path: | ||
| """Download a URL to a named temporary file and return its path.""" | ||
| async with aiohttp.ClientSession() as session: | ||
| async with session.get(url) as resp: | ||
| resp.raise_for_status() | ||
| body = await resp.read() | ||
| fd, raw_path = tempfile.mkstemp(suffix=suffix) | ||
| path = Path(raw_path) | ||
| try: | ||
| os.write(fd, body) | ||
| finally: | ||
| os.close(fd) | ||
| log.debug(f"Downloaded {url} to {path} ({len(body)} bytes)") | ||
| return path |
There was a problem hiding this comment.
Is it possible to stream to file instead of putting the all file is RAM before writing to disk?
There was a problem hiding this comment.
It's possible, we are already doing it in utils/files.py -> download_resource. We could use a shared function between the two.
There was a problem hiding this comment.
Refactored utils/files.py to use streaming HTTP for download_url_to_tempfile, also relocated and unify it with download_resource: 357dfb0
| record = await Check.get_by_id(check_id, with_deleted=True) | ||
| if not record: | ||
| log.error(f"task_csv_to_geojson: check {check_id} not found") | ||
| return |
There was a problem hiding this comment.
Check.get_by_id return the check only if it is the last one for this resource. If there is a new check before this job start it will not run. Not sure if it's a problem in real life but maybe worth noting that if the low queue have trouble depiling we may never run this task (if new checks for this ressource arrive before the queue unpile)
0ef471b to
5086a36
Compare
Geo export runs as chained tasks on the low queue; GeoJSON is generated from the PostgreSQL parsing table when CSV_TO_DB is enabled. RQ entrypoint uses context for exception-queue routing. Made-with: Cursor
# Conflicts: # udata_hydra/analysis/geojson.py
Co-authored-by: Thibaud Ollagnier <ThibaudDauce@users.noreply.github.com>
…url_to_tempfile - Add _http_get_to_temp_path (Path, total_bytes cap, shared session options) - Factor download_resource and download_url_to_tempfile through it - GeoJSON→PMTiles: headers, max_size_allowed, IOException handling - download_file: BinaryIO + HTTP_DOWNLOAD_CHUNK_SIZE; remove_remainders uses Path.unlink Made-with: Cursor
357dfb0 to
c309aed
Compare
aafcc62 to
c309aed
Compare
Closes #412 (and duplicate datagouv/data.gouv.fr#1991).
This is built on #404 where GeoJSON is generated from the PostgreSQL parsing table (streaming from the DB) instead of re-reading the CSV file.
Main Changes:
GeoJSON and PMTiles now run as separate RQ jobs on the
lowqueue so scheduling can treat them differently from CSV-to-DB ingest. WhenCSV_TO_DBis enabled, geo work does not depend on the download temp file still being on disk.The previous single
geojson.pymodule is split into three smaller modules so CSV export, PMTiles conversion, and the remaining orchestration are easier to follow and test.Removed
CSV_TO_GEOJSONconfig: since GeoJSON for CSVs now comes from the Postgres parsing table, not by reading the file again, this old flag was redundant. One switch (DB_TO_GEOJSON) is enough to turn that on or off.Refactored utils/files.py to use streaming HTTP for download_url_to_tempfile and relocate and unify it with download_resource : 357dfb0
Suggested in later, separate PRs:
task_*RQ entrypoint names across the codebasegeojson.pyfor what that file still owns now that CSV GeoJSON and PMTiles live in their own modules (what aboutgeo_analysis.py?)